/*
 * Copyright (C) 2010 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} that uses the same ordering rules as class
 * {@link PriorityQueue} and supplies blocking retrieval operations. While this queue is logically
 * unbounded, attempted additions may fail due to resource exhaustion (causing
 * <tt>OutOfMemoryError</tt>). This class does not permit <tt>null</tt> elements. A priority queue
 * relying on {@linkplain Comparable natural ordering} also does not permit insertion of
 * non-comparable objects (doing so results in <tt>ClassCastException</tt>).
 *
 * <p>
 * This class and its iterator implement all of the <em>optional</em> methods of the
 * {@link Collection} and {@link Iterator} interfaces. The Iterator provided in method
 * {@link #iterator()} is <em>not</em> guaranteed to traverse the elements of the
 * MonitorBasedPriorityBlockingQueue in any particular order. If you need ordered traversal,
 * consider using <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt> can be used to
 * <em>remove</em> some or all elements in priority order and place them in another collection.
 *
 * <p>
 * Operations on this class make no guarantees about the ordering of elements with equal priority.
 * If you need to enforce an ordering, you can define custom classes or comparators that use a
 * secondary key to break ties in primary priority values. For example, here is a class that applies
 * first-in-first-out tie-breaking to comparable elements. To use it, you would insert a
 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
 *
 * <pre>
 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt; implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
 *     final static AtomicLong seq = new AtomicLong();
 *     final long seqNum;
 *     final E entry;
 * 
 *     public FIFOEntry(E entry) {
 *         seqNum = seq.getAndIncrement();
 *         this.entry = entry;
 *     }
 * 
 *     public E getEntry() {
 *         return entry;
 *     }
 * 
 *     public int compareTo(FIFOEntry&lt;E&gt; other) {
 *         int res = entry.compareTo(other.entry);
 *         if (res == 0 &amp;&amp; other.entry != this.entry)
 *             res = (seqNum &lt; other.seqNum ? -1 : 1);
 *         return res;
 *     }
 * }
 * </pre>
 *
 * @author Doug Lea
 * @author Justin T. Sampson
 * @param <E> the type of elements held in this collection
 */
@CanIgnoreReturnValue // TODO(cpovirk): Consider being more strict.
public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from
    // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/

    private static final long serialVersionUID = 5595510919245408276L;

    final PriorityQueue<E> q;
    final Monitor monitor = new Monitor(true);
    private final Monitor.Guard notEmpty = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return !q.isEmpty();
        }
    };

    /**
     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default initial capacity (11)
     * that orders its elements according to their {@linkplain Comparable natural ordering}.
     */
    public MonitorBasedPriorityBlockingQueue() {
        q = new PriorityQueue<E>();
    }

    /**
     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial capacity that
     * orders its elements according to their {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less than 1
     */
    public MonitorBasedPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityQueue<E>(initialCapacity, null);
    }

    /**
     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial capacity that
     * orders its elements according to the specified comparator.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @param comparator the comparator that will be used to order this priority queue. If
     *        {@code null}, the {@linkplain Comparable natural ordering} of the elements will be
     *        used.
     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less than 1
     */
    public MonitorBasedPriorityBlockingQueue(int initialCapacity, @Nullable Comparator<? super E> comparator) {
        q = new PriorityQueue<E>(initialCapacity, comparator);
    }

    /**
     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements in the specified
     * collection. If the specified collection is a {@link SortedSet} or a {@link PriorityQueue},
     * this priority queue will be ordered according to the same ordering. Otherwise, this priority
     * queue will be ordered according to the {@linkplain Comparable natural ordering} of its
     * elements.
     *
     * @param c the collection whose elements are to be placed into this priority queue
     * @throws ClassCastException if elements of the specified collection cannot be compared to one
     *         another according to the priority queue's ordering
     * @throws NullPointerException if the specified collection or any of its elements are null
     */
    public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) {
        q = new PriorityQueue<E>(c);
    }

    /**
     * Inserts the specified element into this priority queue.
     *
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     * @throws ClassCastException if the specified element cannot be compared with elements
     *         currently in the priority queue according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    @Override
    public boolean add(E e) {
        return offer(e);
    }

    /**
     * Inserts the specified element into this priority queue.
     *
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared with elements
     *         currently in the priority queue according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    @Override
    public boolean offer(E e) {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            boolean ok = q.offer(e);
            if (!ok) {
                throw new AssertionError();
            }
            return true;
        } finally {
            monitor.leave();
        }
    }

    /**
     * Inserts the specified element into this priority queue. As the queue is unbounded this method
     * will never block.
     *
     * @param e the element to add
     * @throws ClassCastException if the specified element cannot be compared with elements
     *         currently in the priority queue according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    @Override
    public void put(E e) {
        offer(e); // never need to block
    }

    /**
     * Inserts the specified element into this priority queue. As the queue is unbounded this method
     * will never block.
     *
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return <tt>true</tt>
     * @throws ClassCastException if the specified element cannot be compared with elements
     *         currently in the priority queue according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) {
        checkNotNull(unit);
        return offer(e); // never need to block
    }

    @Override
    public E poll() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.poll();
        } finally {
            monitor.leave();
        }
    }

    @Override
    public E take() throws InterruptedException {
        final Monitor monitor = this.monitor;
        monitor.enterWhen(notEmpty);
        try {
            return q.poll();
        } finally {
            monitor.leave();
        }
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        final Monitor monitor = this.monitor;
        if (monitor.enterWhen(notEmpty, timeout, unit)) {
            try {
                return q.poll();
            } finally {
                monitor.leave();
            }
        } else {
            return null;
        }
    }

    @Override
    public E peek() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.peek();
        } finally {
            monitor.leave();
        }
    }

    /**
     * Returns the comparator used to order the elements in this queue, or <tt>null</tt> if this
     * queue uses the {@linkplain Comparable natural ordering} of its elements.
     *
     * @return the comparator used to order the elements in this queue, or <tt>null</tt> if this
     *         queue uses the natural ordering of its elements
     */
    public Comparator<? super E> comparator() {
        return q.comparator();
    }

    @Override
    public int size() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.size();
        } finally {
            monitor.leave();
        }
    }

    /**
     * Always returns <tt>Integer.MAX_VALUE</tt> because a
     * <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained.
     * 
     * @return <tt>Integer.MAX_VALUE</tt>
     */
    @Override
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    /**
     * Removes a single instance of the specified element from this queue, if it is present. More
     * formally, removes an element {@code e} such that {@code o.equals(e)}, if this queue contains
     * one or more such elements. Returns {@code true} if and only if this queue contained the
     * specified element (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return <tt>true</tt> if this queue changed as a result of the call
     */
    @Override
    public boolean remove(@Nullable Object o) {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.remove(o);
        } finally {
            monitor.leave();
        }
    }

    /**
     * Returns {@code true} if this queue contains the specified element. More formally, returns
     * {@code true} if and only if this queue contains at least one element {@code e} such that
     * {@code o.equals(e)}.
     *
     * @param o object to be checked for containment in this queue
     * @return <tt>true</tt> if this queue contains the specified element
     */
    @Override
    public boolean contains(@Nullable Object o) {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.contains(o);
        } finally {
            monitor.leave();
        }
    }

    /**
     * Returns an array containing all of the elements in this queue. The returned array elements
     * are in no particular order.
     *
     * <p>
     * The returned array will be "safe" in that no references to it are maintained by this queue.
     * (In other words, this method must allocate a new array). The caller is thus free to modify
     * the returned array.
     *
     * <p>
     * This method acts as bridge between array-based and collection-based APIs.
     *
     * @return an array containing all of the elements in this queue
     */
    @Override
    public Object[] toArray() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.toArray();
        } finally {
            monitor.leave();
        }
    }

    @Override
    public String toString() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.toString();
        } finally {
            monitor.leave();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     * @throws IllegalArgumentException {@inheritDoc}
     */
    @Override
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            int n = 0;
            E e;
            while ((e = q.poll()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            monitor.leave();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     * @throws IllegalArgumentException {@inheritDoc}
     */
    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            int n = 0;
            E e;
            while (n < maxElements && (e = q.poll()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            monitor.leave();
        }
    }

    /**
     * Atomically removes all of the elements from this queue. The queue will be empty after this
     * call returns.
     */
    @Override
    public void clear() {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            q.clear();
        } finally {
            monitor.leave();
        }
    }

    /**
     * Returns an array containing all of the elements in this queue; the runtime type of the
     * returned array is that of the specified array. The returned array elements are in no
     * particular order. If the queue fits in the specified array, it is returned therein.
     * Otherwise, a new array is allocated with the runtime type of the specified array and the size
     * of this queue.
     *
     * <p>
     * If this queue fits in the specified array with room to spare (i.e., the array has more
     * elements than this queue), the element in the array immediately following the end of the
     * queue is set to <tt>null</tt>.
     *
     * <p>
     * Like the {@link #toArray()} method, this method acts as bridge between array-based and
     * collection-based APIs. Further, this method allows precise control over the runtime type of
     * the output array, and may, under certain circumstances, be used to save allocation costs.
     *
     * <p>
     * Suppose <tt>x</tt> is a queue known to contain only strings. The following code can be used
     * to dump the queue into a newly allocated array of <tt>String</tt>:
     *
     * <pre>
     * String[] y = x.toArray(new String[0]);
     * </pre>
     *
     * <p>
     * Note that <tt>toArray(new Object[0])</tt> is identical in function to <tt>toArray()</tt>.
     *
     * @param a the array into which the elements of the queue are to be stored, if it is big
     *        enough; otherwise, a new array of the same runtime type is allocated for this purpose
     * @return an array containing all of the elements in this queue
     * @throws ArrayStoreException if the runtime type of the specified array is not a supertype of
     *         the runtime type of every element in this queue
     * @throws NullPointerException if the specified array is null
     */
    @Override
    public <T> T[] toArray(T[] a) {
        final Monitor monitor = this.monitor;
        monitor.enter();
        try {
            return q.toArray(a);
        } finally {
            monitor.leave();
        }
    }

    /**
     * Returns an iterator over the elements in this queue. The iterator does not return the
     * elements in any particular order. The returned <tt>Iterator</tt> is a "weakly consistent"
     * iterator that will never throw {@link ConcurrentModificationException}, and guarantees to
     * traverse elements as they existed upon construction of the iterator, and may (but is not
     * guaranteed to) reflect any modifications subsequent to construction.
     *
     * @return an iterator over the elements in this queue
     */
    @Override
    public Iterator<E> iterator() {
        return new Itr(toArray());
    }

    /**
     * Snapshot iterator that works off copy of underlying q array.
     */
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor; // index of next element to return;
        int lastRet; // index of last element, or -1 if no such

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        @Override
        public boolean hasNext() {
            return cursor < array.length;
        }

        @Override
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;

            // array comes from q.toArray() and so should have only E's in it
            @SuppressWarnings("unchecked")
            E e = (E) array[cursor++];
            return e;
        }

        @Override
        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            Object x = array[lastRet];
            lastRet = -1;
            // Traverse underlying queue to find == element,
            // not just a .equals element.
            monitor.enter();
            try {
                for (Iterator<E> it = q.iterator(); it.hasNext();) {
                    if (it.next() == x) {
                        it.remove();
                        return;
                    }
                }
            } finally {
                monitor.leave();
            }
        }
    }

    /**
     * Saves the state to a stream (that is, serializes it). This merely wraps default serialization
     * within the monitor. The serialization strategy for items is left to underlying Queue. Note
     * that locking is not needed on deserialization, so readObject is not defined, just relying on
     * default.
     */
    private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
        monitor.enter();
        try {
            s.defaultWriteObject();
        } finally {
            monitor.leave();
        }
    }

}
